home *** CD-ROM | disk | FTP | other *** search
/ Personal Computer World 2008 February / PCWFEB08.iso / Software / Freeware / Miro 1.0 / Miro_Installer.exe / xulrunner / python / dl_daemon / bittorrentdtv.py < prev    next >
Encoding:
Python Source  |  2007-11-12  |  15.3 KB  |  395 lines

  1. """Glue code to handle BitTorrent stuff.  Most of this comes from download.py
  2. in the BitTorrent library.
  3. """
  4.  
  5. from urlparse import urljoin
  6. from binascii import b2a_hex
  7. from sha import sha
  8. from os import path, makedirs
  9. from socket import error as socketerror
  10. from random import seed
  11. from threading import Thread, Event, Lock
  12. from time import time
  13. from Queue import Queue
  14. try:
  15.     from os import getpid
  16. except ImportError:
  17.     def getpid():
  18.         return 1
  19.  
  20. from BitTorrent.bitfield import Bitfield
  21. from BitTorrent.btformats import check_message
  22. from BitTorrent.Choker import Choker
  23. from BitTorrent.Storage import Storage
  24. from BitTorrent.StorageWrapper import StorageWrapper
  25. from BitTorrent.Uploader import Upload
  26. from BitTorrent.Downloader import Downloader
  27. from BitTorrent.Connecter import Connecter
  28. from BitTorrent.Encrypter import Encoder
  29. from BitTorrent.RawServer import RawServer
  30. from BitTorrent.Rerequester import Rerequester
  31. from BitTorrent.DownloaderFeedback import DownloaderFeedback
  32. from BitTorrent.RateMeasure import RateMeasure
  33. from BitTorrent.CurrentRateMeasure import Measure
  34. from BitTorrent.PiecePicker import PiecePicker
  35. from BitTorrent.bencode import bencode, bdecode
  36. from BitTorrent.download import defaults
  37. from BitTorrent import version
  38. from natpunch import UPnP_test, UPnP_open_port, UPnP_close_port
  39.  
  40. import util
  41. import config as dtv_config
  42. import prefs
  43.  
  44. config = {}
  45. for key, default, description in defaults:
  46.     config[key] = default
  47. config['report_hash_failures'] = True
  48. storage_lock = Lock()
  49. upnp_type = UPnP_test(1) # fast method
  50.  
  51. downloader_count = util.ThreadSafeCounter()
  52.  
  53. def calc_max_upload_rate():
  54.     # FIXME: this code to limit the rate for multiple downloaders is fairly
  55.     # dubious.  If some of the downloaders use less then their share of upload
  56.     # bandwith, we should give it to others.
  57.     total_rate = int(dtv_config.get(prefs.UPSTREAM_LIMIT_IN_KBS) * 1024)
  58.     downloaders = downloader_count.getvalue()
  59.     if downloaders != 0:
  60.         return total_rate / downloaders
  61.     else:
  62.         return 0
  63.  
  64. class TorrentDownload:
  65.     def __init__(self, torrent_data, download_to, fast_resume_data=None):
  66.         """Create a new torrent.  torrent_data is the contents of a torrent
  67.         file/url.  download_to is the file/directory to save the torrent to.
  68.         fast_resume_data is data used to quickly restart the torrent, it's
  69.         returned by the shutdown() method.
  70.         """
  71.  
  72.         self.doneflag = Event()
  73.         self.finflag = Event()
  74.         self.torrent_data = torrent_data
  75.         self.download_to = download_to
  76.         self.fast_resume_data = fast_resume_data
  77.         self.fast_resume_queue = Queue()
  78.         self.rawserver = RawServer(self.doneflag,
  79.                 config['timeout_check_interval'], config['timeout'],
  80.                 errorfunc=self.on_error, maxconnects=config['max_allow_in'])
  81.         self.thread = None
  82.         self.current_status = {}
  83.         self.status_callback = None
  84.         # we set time_est_func to a real function in download().  For now use
  85.         # a placeholder function
  86.         self.time_est_func = lambda: 0
  87.         self.last_up_total = self.last_down_total = 0.0
  88.         self.last_activity = None
  89.         self.rawserver_started = False
  90.         self.minport = dtv_config.get(prefs.BT_MIN_PORT)
  91.         self.maxport = dtv_config.get(prefs.BT_MAX_PORT)
  92.  
  93.     def start(self):
  94.         """Start downloading the torrent."""
  95.         self.thread = Thread(target=self.download_thread)
  96.         filename = path.basename(self.download_to)
  97.         self.thread.setName("BitTorrent Downloader - %s" % util.stringify(filename, "replace"))
  98.         self.thread.start()
  99.  
  100.     def shutdown(self):
  101.         """Stop downloading the torrent.
  102.  
  103.         Returns a string that can be used as fast resume data.
  104.         """
  105.  
  106.         self.doneflag.set()
  107.         self.rawserver.wakeup()
  108.         if self.rawserver_started:
  109.             try:
  110.                 return self.fast_resume_queue.get(timeout=10)
  111.             except Queue.Empty:
  112.                 return None
  113.         else:
  114.             return self.fast_resume_data
  115.  
  116.     def parse_fast_resume_data(self, total_pieces):
  117.         already_got = None
  118.         mtimes = {}
  119.         if self.fast_resume_data is not None:
  120.             try:
  121.                 fast_resume = bdecode(self.fast_resume_data)
  122.                 already_got = fast_resume['already_got']
  123.                 mtimes = fast_resume['mtimes']
  124.             except:
  125.                 import traceback
  126.                 print "WARNING: ERROR parsing fast resume data"
  127.                 traceback.print_exc(1)
  128.                 self.fast_resume_data = None
  129.         try:
  130.             self.pieces_already_got = Bitfield(total_pieces, already_got)
  131.         except:
  132.             print "Failed to load resume data"
  133.             self.pieces_already_got = Bitfield(total_pieces, None)
  134.         self.fast_resume_mtimes = mtimes
  135.  
  136.     def skip_hash_check(self, index, files):
  137.         if not self.pieces_already_got[index]:
  138.             return False
  139.         for f in files:
  140.             mtimes_key = f.encode('utf-8')
  141.             if path.getmtime(f) > self.fast_resume_mtimes.get(mtimes_key, 0):
  142.                 return False
  143.         return True
  144.  
  145.     def set_status_callback(self, func):
  146.         """Register a callback function.  func will be called whenever the
  147.         torrent download status changes and periodically while the torrent
  148.         downloads.  It will be passed a dict with the following attributes:
  149.  
  150.         activity -- string specifying what's currently happening or None for
  151.                 normal operations.  
  152.         upRate -- upload rate in B/s
  153.         downRate -- download rate in B/s
  154.         upTotal -- total MB uploaded (this run)
  155.         downTotal -- total MB downloaded (this run)
  156.         fractionDone -- what portion of the download is completed.
  157.         timeEst -- estimated completion time, in seconds.
  158.         totalSize -- total size of the torrent in bytes
  159.         """
  160.         self.status_callback = func
  161.  
  162.     def on_error(self, message):
  163.         print "WARNING BitTorrent error: ", message
  164.  
  165.     def on_status(self, status_dict):
  166.         status = {
  167.             'upRate': status_dict.get('upRate', 0),
  168.             'downRate': status_dict.get('downRate', 0),
  169.             'upTotal': status_dict.get('upTotal', self.last_up_total),
  170.             'downTotal': status_dict.get('downTotal', self.last_down_total),
  171.             'timeEst': self.time_est_func(),
  172.             'totalSize': self.total_size,
  173.         }
  174.  
  175.         if status['timeEst'] is None:
  176.             status['timeEst'] = 0
  177.         if self.finflag.isSet():
  178.             status['fractionDone'] = 1.0
  179.         else:
  180.             status['fractionDone'] = status_dict.get('fractionDone', 0.0)
  181.         if status['downRate'] > 0 or status['upRate'] > 0:
  182.             status['activity'] = None
  183.         else:
  184.             status['activity'] = status_dict.get('activity',
  185.                     self.last_activity)
  186.  
  187.         self.last_up_total = status['upTotal']
  188.         self.last_down_total = status['downTotal']
  189.         self.last_activity = status['activity']
  190.         self.status_callback(status)
  191.  
  192.     def update_max_upload_rate(self):
  193.         current_rate = calc_max_upload_rate()
  194.         if current_rate != self.max_upload_rate:
  195.             self.connecter.change_max_upload_rate(current_rate)
  196.             self.max_upload_rate = current_rate
  197.         self.rawserver.add_task(self.update_max_upload_rate, 5)
  198.  
  199.     def filefunc(self, file, length, saveas, isdir):
  200.         self.total_size = length
  201.         return self.download_to
  202.  
  203.     def download_thread(self):
  204.         downloader_count.inc()
  205.         try:
  206.             self.download()
  207.         finally:
  208.             downloader_count.dec()
  209.  
  210.     def download(self):
  211.         # Basically coppied from from the download() function in
  212.         # BitTorrent.download.  Modified slightly to work with democracy.
  213.         spewflag = Event()
  214.         try:
  215.             response = bdecode(self.torrent_data)
  216.             check_message(response)
  217.         except ValueError, e:
  218.             self.on_error("got bad file info - " + str(e))
  219.             return
  220.         
  221.         try:
  222.             def make(f, forcedir = False):
  223.                 if not forcedir:
  224.                     f = path.split(f)[0]
  225.                 if f != '' and not path.exists(f):
  226.                     makedirs(f)
  227.                     
  228.             info = response['info']
  229.             if info.has_key('length'):
  230.                 file_length = info['length']
  231.                 file = self.filefunc(info['name'], file_length, 
  232.                         config['saveas'], False)
  233.                 if file is None:
  234.                     return
  235.                 make(file)
  236.                 files = [(file, file_length)]
  237.             else:
  238.                 file_length = 0
  239.                 for x in info['files']:
  240.                     file_length += x['length']
  241.                 file = self.filefunc(info['name'], file_length, 
  242.                         config['saveas'], True)
  243.                 if file is None:
  244.                     return
  245.       
  246.                 make(file, True)
  247.                 
  248.                 files = []
  249.                 for x in info['files']:
  250.                     n = file
  251.                     for i in x['path']:
  252.                         n = path.join(n, i)
  253.                     files.append((n, x['length']))
  254.                     make(n)
  255.         except OSError, e:
  256.             self.on_error("Couldn't allocate dir - " + str(e))
  257.             return
  258.         
  259.         finflag = self.finflag
  260.         ann = [None]
  261.         myid = 'M' + version.replace('.', '-')
  262.         myid = myid + ('-' * (8 - len(myid))) + b2a_hex(sha(repr(time()) + ' ' + str(getpid())).digest()[-6:])
  263.         seed(myid)
  264.         pieces = [info['pieces'][x:x+20] for x in xrange(0, 
  265.             len(info['pieces']), 20)]
  266.         self.parse_fast_resume_data(len(pieces))
  267.         def failed(reason):
  268.             self.doneflag.set()
  269.             if reason is not None:
  270.                 self.on_error(reason)
  271.         rawserver = self.rawserver
  272.         storage_lock.acquire()
  273.         try:
  274.             try:
  275.                 try:
  276.                     storage = Storage(files, open, path.exists, path.getsize)
  277.                 except IOError, e:
  278.                     self.on_error('trouble accessing files - ' + str(e))
  279.                     return
  280.                 def finished(finflag = finflag, ann = ann, storage = storage):
  281.                     finflag.set()
  282.                     try:
  283.                         storage.set_readonly()
  284.                     except (IOError, OSError), e:
  285.                         self.on_error('trouble setting readonly at end - ' + str(e))
  286.                     if ann[0] is not None:
  287.                         ann[0](1)
  288.                 rm = [None]
  289.                 def data_flunked(amount, rm = rm, report_hash_failures = config['report_hash_failures']):
  290.                     if rm[0] is not None:
  291.                         rm[0](amount)
  292.                     if report_hash_failures:
  293.                         self.on_error('a piece failed hash check, re-downloading it')
  294.                 storagewrapper = StorageWrapper(storage,
  295.                         config['download_slice_size'], pieces, 
  296.                         info['piece length'], finished, failed, self.on_status,
  297.                         self.doneflag, config['check_hashes'], data_flunked,
  298.                         self.skip_hash_check)
  299.             except ValueError, e:
  300.                 failed('bad data - ' + str(e))
  301.             except IOError, e:
  302.                 failed('IOError - ' + str(e))
  303.         finally:
  304.             storage_lock.release()
  305.         if self.doneflag.isSet():
  306.             return
  307.  
  308.         e = 'maxport less than minport - no ports to check'
  309.  
  310.         for listen_port in xrange(self.minport, self.maxport + 1):
  311.             try:
  312.                 rawserver.bind(listen_port, config['bind'])
  313.                 break
  314.             except socketerror, e:
  315.                 pass
  316.         else:
  317.             self.on_error("Couldn't listen - " + str(e))
  318.             return
  319.         if upnp_type:
  320.             upnp_active = UPnP_open_port(listen_port)
  321.             if not upnp_active:
  322.                 print "WARNING: can't open port with UPnP"
  323.         else:
  324.             upnp_active = 0
  325.  
  326.         choker = Choker(config['max_uploads'], rawserver.add_task, finflag.isSet, 
  327.             config['min_uploads'])
  328.         upmeasure = Measure(config['max_rate_period'], 
  329.             config['upload_rate_fudge'])
  330.         downmeasure = Measure(config['max_rate_period'])
  331.         def make_upload(connection, choker = choker, 
  332.                 storagewrapper = storagewrapper, 
  333.                 max_slice_length = config['max_slice_length'],
  334.                 max_rate_period = config['max_rate_period'],
  335.                 fudge = config['upload_rate_fudge']):
  336.             return Upload(connection, choker, storagewrapper, 
  337.                 max_slice_length, max_rate_period, fudge)
  338.         ratemeasure = RateMeasure(storagewrapper.get_amount_left())
  339.         self.time_est_func = ratemeasure.get_time_left
  340.         rm[0] = ratemeasure.data_rejected
  341.         picker = PiecePicker(len(pieces), config['rarest_first_cutoff'])
  342.         for i in xrange(len(pieces)):
  343.             if storagewrapper.do_I_have(i):
  344.                 picker.complete(i)
  345.         downloader = Downloader(storagewrapper, picker,
  346.             config['request_backlog'], config['max_rate_period'],
  347.             len(pieces), downmeasure, config['snub_time'], 
  348.             ratemeasure.data_came_in)
  349.         self.max_upload_rate = calc_max_upload_rate()
  350.         connecter = Connecter(make_upload, downloader, choker,
  351.             len(pieces), upmeasure, self.max_upload_rate, rawserver.add_task)
  352.         self.connecter = connecter
  353.         infohash = sha(bencode(info)).digest()
  354.         encoder = Encoder(connecter, rawserver, 
  355.             myid, config['max_message_length'], rawserver.add_task, 
  356.             config['keepalive_interval'], infohash, config['max_initiate'])
  357.         rerequest = Rerequester(response['announce'],
  358.                 config['rerequest_interval'], rawserver.add_task,
  359.                 connecter.how_many_connections, config['min_peers'],
  360.                 encoder.start_connection, rawserver.add_task,
  361.                 storagewrapper.get_amount_left, upmeasure.get_total,
  362.                 downmeasure.get_total, listen_port, config['ip'], myid,
  363.                 infohash, config['http_timeout'], self.on_error,
  364.                 config['max_initiate'], self.doneflag, upmeasure.get_rate,
  365.                 downmeasure.get_rate, encoder.ever_got_incoming)
  366.         if config['spew']:
  367.             spewflag.set()
  368.         DownloaderFeedback(choker, rawserver.add_task, self.on_status, 
  369.             upmeasure.get_rate, downmeasure.get_rate, 
  370.             upmeasure.get_total, downmeasure.get_total, ratemeasure.get_time_left, 
  371.             ratemeasure.get_size_left, file_length, finflag,
  372.             config['display_interval'], spewflag)
  373.         self.on_status({"activity" : 'connecting to peers'})
  374.         ann[0] = rerequest.announce
  375.         rerequest.begin()
  376.         self.rawserver.add_task(self.update_max_upload_rate, 5)
  377.         self.rawserver_started = True
  378.         try:
  379.             rawserver.listen_forever(encoder)
  380.         finally:
  381.             try:
  382.                 fast_resume_data = {
  383.                     'already_got': storagewrapper.get_have_list(),
  384.                     'mtimes': dict([(f, long(path.getmtime(f))) for \
  385.                             f, size in files]),
  386.                 }
  387.                 self.fast_resume_queue.put(bencode(fast_resume_data))
  388.             except:
  389.                 self.fast_resume_queue.put(None)
  390.                 raise
  391.         storage.close()
  392.         if upnp_active:
  393.             UPnP_close_port(listen_port)
  394.         rerequest.announce(2)
  395.